package org.unidata.mdm.data.service.job.fraction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.batch.core.StepExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.service.job.JobCommonParameters;
import org.unidata.mdm.core.type.annotation.JobRef;
import org.unidata.mdm.core.type.model.RegisterElement;
import org.unidata.mdm.data.context.UpsertRelationRequestContext;
import org.unidata.mdm.data.context.UpsertRelationRequestContext.UpsertRelationHint;
import org.unidata.mdm.data.context.UpsertRelationsRequestContext;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.dao.RelationsDAO;
import org.unidata.mdm.data.service.segments.relations.batch.RelationsUpsertConnectorExecutor;
import org.unidata.mdm.data.service.segments.relations.batch.RelationsUpsertFinishExecutor;
import org.unidata.mdm.data.service.segments.relations.batch.RelationsUpsertPersistenceExecutor;
import org.unidata.mdm.data.service.segments.relations.batch.RelationsUpsertProcessExecutor;
import org.unidata.mdm.data.service.segments.relations.batch.RelationsUpsertStartExecutor;
import org.unidata.mdm.data.service.segments.relations.upsert.RelationUpsertFinishExecutor;
import org.unidata.mdm.data.service.segments.relations.upsert.RelationUpsertIndexingExecutor;
import org.unidata.mdm.data.service.segments.relations.upsert.RelationUpsertStartExecutor;
import org.unidata.mdm.data.service.segments.relations.upsert.RelationUpsertTimelineExecutor;
import org.unidata.mdm.data.type.apply.batch.impl.RecordUpsertBatchSetAccumulator;
import org.unidata.mdm.data.type.apply.batch.impl.RelationUpsertBatchSetAccumulator;
import org.unidata.mdm.meta.configuration.Descriptors;
import org.unidata.mdm.meta.type.RelativeDirection;
import org.unidata.mdm.meta.type.instance.DataModelInstance;
import org.unidata.mdm.system.service.PipelineService;
import org.unidata.mdm.system.type.batch.BatchSetAccumulator;
import org.unidata.mdm.system.type.job.ModularBatchJobFraction;
import org.unidata.mdm.system.type.pipeline.Pipeline;
import org.unidata.mdm.system.type.pipeline.connection.PipelineConnection;

@JobRef("reindexDataJob")
@Component("reindexRelationsJobFraction")
public class ReindexRelationsJobFraction implements ModularBatchJobFraction {
    /**
     * Clean types
     */
    private boolean skipDrop;
    /**
     * Job operation id
     */
    private String operationId;
    /**
     * Entity name
     */
    private DataModelInstance instance;
    /**
     * Relations DAO.
     */
    @Autowired
    private RelationsDAO relationsDao;
    /**
     * The MMS.
     */
    @Autowired
    private MetaModelService metaModelService;
    /**
     * PLS.
     */
    @Autowired
    private PipelineService pipelineService;
    /**
     * The fraction is unused.
     */
    private boolean unused;
    /**
     * Constructor.
     */
    public ReindexRelationsJobFraction() {
        super();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void before(String stepName, StepExecution step) {

        if (!"reindexDataJobDataStep".equals(stepName)) {
            return;
        }

        // If either one is true, rels will be reindexed
        boolean jobReindexRelations = BooleanUtils.toBoolean(step.getJobParameters().getString("reindexRelations", Boolean.FALSE.toString()));
        boolean stepReindexRelations = BooleanUtils.toBoolean(step.getExecutionContext().getString("reindexRelations", Boolean.FALSE.toString()));

        unused = !jobReindexRelations && !stepReindexRelations;
        if (!unused) {
            skipDrop = BooleanUtils.toBoolean(step.getJobParameters().getString("cleanIndexes", "false"));
            operationId = step.getJobParameters().getString(JobCommonParameters.PARAM_OPERATION_ID);
            instance = metaModelService.instance(Descriptors.DATA);
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public PipelineConnection connect(String stepName, StepExecution stepExecution) {

        if (unused) {
            return null;
        }

        return PipelineConnection.single()
                .pipeline(Pipeline.start(pipelineService.start(RelationsUpsertStartExecutor.SEGMENT_ID))
                    .with(pipelineService.point(RelationsUpsertProcessExecutor.SEGMENT_ID))
                    .with(pipelineService.point(RelationsUpsertPersistenceExecutor.SEGMENT_ID))
                    .end(pipelineService.finish(RelationsUpsertFinishExecutor.SEGMENT_ID)))
                .segment(pipelineService.connector(RelationsUpsertConnectorExecutor.SEGMENT_ID))
                .build();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void accumulate(String stepName, StepExecution stepExecution, BatchSetAccumulator<?, ?> bs) {

        if (unused) {
            return;
        }

        RecordUpsertBatchSetAccumulator bsa = (RecordUpsertBatchSetAccumulator) bs;
        List<UpsertRelationsRequestContext> payload = new ArrayList<>(bsa.workingCopy().size());

        // We just know, that entity name is set
        // For ALL entity names can be different in the same block/partition
        for (UpsertRequestContext ctx : bsa.workingCopy()) {

            UpsertRelationsRequestContext result = append(ctx);
            if (Objects.nonNull(result)) {
                payload.add(result);
            }
        }

        charge(bsa, payload);
    }

    private void charge(RecordUpsertBatchSetAccumulator bsa, List<UpsertRelationsRequestContext> payload) {

        if (CollectionUtils.isEmpty(payload)) {
            return;
        }

        RelationUpsertBatchSetAccumulator relbsa = bsa.fragment(RelationUpsertBatchSetAccumulator.ID);
        if (Objects.isNull(relbsa)) {

            // Our fragment pipeline
            Pipeline p = Pipeline.start(pipelineService.start(RelationUpsertStartExecutor.SEGMENT_ID))
                    .with(pipelineService.point(RelationUpsertTimelineExecutor.SEGMENT_ID))
                    .with(pipelineService.point(RelationUpsertIndexingExecutor.SEGMENT_ID))
                    .end(pipelineService.finish(RelationUpsertFinishExecutor.SEGMENT_ID));

            relbsa = new RelationUpsertBatchSetAccumulator(bsa.workingCopy().size(), false, false);
            relbsa.setPipeline(p);

            // Add to carrier
            bsa.fragment(relbsa);
        }

        relbsa.charge(payload);
    }

    private UpsertRelationsRequestContext append(UpsertRequestContext ctx) {

        RegisterElement el = instance.getRegister(ctx.getEntityName());
        if (Objects.nonNull(el)) {

            Map<String, List<UpsertRelationRequestContext>> result = new HashMap<>();
            if (MapUtils.isNotEmpty(el.getOutgoingRelations())) {

                Map<String, List<UUID>> fromIds
                    = relationsDao.loadMappedRelationEtalonIds(
                            UUID.fromString(ctx.getEtalonKey()), Collections.emptyList(), RelativeDirection.FROM);

                fromIds.forEach((k, v) ->
                    result.put(k, v.stream()
                            .map(id ->
                                UpsertRelationRequestContext.builder()
                                    .relationEtalonKey(id.toString())
                                    .relationName(k)
                                    .recalculateWholeTimeline(true)
                                    .skipIndexDrop(skipDrop)
                                    .operationId(operationId)
                                    .hint(UpsertRelationHint.HINT_PROCESSING_SIDE, RelativeDirection.FROM)
                                    .build()
                            )
                            .collect(Collectors.toList())));
            }

            if (MapUtils.isNotEmpty(el.getIncomingRelations())) {

                Map<String, List<UUID>> toIds
                    = relationsDao.loadMappedRelationEtalonIds(
                            UUID.fromString(ctx.getEtalonKey()), null, RelativeDirection.TO);

                toIds.forEach((k, v) ->
                    result.put(k, v.stream()
                            .map(id ->
                                UpsertRelationRequestContext.builder()
                                    .relationEtalonKey(id.toString())
                                    .relationName(k)
                                    .recalculateWholeTimeline(true)
                                    .skipIndexDrop(skipDrop)
                                    .operationId(operationId)
                                    .hint(UpsertRelationHint.HINT_PROCESSING_SIDE, RelativeDirection.TO)
                                    .build()
                            )
                            .collect(Collectors.toList())));
            }

            if (MapUtils.isNotEmpty(result)) {
                return UpsertRelationsRequestContext.builder()
                        .relationsFrom(result)
                        .build();
            }
        }

        return null;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public int order() {
        return 10;
    }
}
